{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b2a39e99",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | default_exp _components.benchmarking"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a6aec7a7",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "from collections import deque\n",
    "from datetime import datetime, timedelta\n",
    "from statistics import mean\n",
    "from typing import *\n",
    "\n",
    "from fastkafka._components.logger import get_logger"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d9f3e46b",
   "metadata": {},
   "outputs": [],
   "source": [
    "from fastkafka._components.logger import suppress_timestamps"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "acb8be86",
   "metadata": {},
   "outputs": [],
   "source": [
    "suppress_timestamps()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "79f00686",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "logger = get_logger(\"fastkafka.benchmark\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "518b9b49",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "def _benchmark(\n",
    "    interval: Union[int, timedelta] = 1,\n",
    "    *,\n",
    "    sliding_window_size: Optional[int] = None,\n",
    "    func_name: str,\n",
    "    benchmark_results: Dict[str, Dict[str, Any]],\n",
    ") -> None:\n",
    "    \"\"\"Used to record the benchmark results(throughput, average throughput, standard deviation) of a given function\n",
    "\n",
    "    Args:\n",
    "        interval: the time interval after which the benchmark results are logged.\n",
    "        sliding_window_size: the maximum number of benchmark results to use to calculate average throughput and standard deviation.\n",
    "        func_name: the name of the function to be benchmarked.\n",
    "        benchmark_results: a dictionary containing the benchmark results of all functions.\n",
    "    \"\"\"\n",
    "    if isinstance(interval, int):\n",
    "        interval = timedelta(seconds=interval)\n",
    "    if func_name not in benchmark_results:\n",
    "        benchmark_results[func_name] = {\n",
    "            \"count\": 0,\n",
    "            \"last_count\": 0,\n",
    "            \"start\": None,\n",
    "            \"last_start\": None,\n",
    "            \"history\": [],\n",
    "        }\n",
    "        if sliding_window_size is not None:\n",
    "            benchmark_results[func_name][\"history\"] = deque(maxlen=sliding_window_size)\n",
    "\n",
    "    benchmark_results[func_name][\"count\"] += 1\n",
    "\n",
    "    if benchmark_results[func_name][\"count\"] == 1:\n",
    "        benchmark_results[func_name][\"start\"] = benchmark_results[func_name][\n",
    "            \"last_start\"\n",
    "        ] = datetime.utcnow()\n",
    "\n",
    "    diff = datetime.utcnow() - benchmark_results[func_name][\"last_start\"]\n",
    "    if diff >= interval:\n",
    "        throughput = (\n",
    "            benchmark_results[func_name][\"count\"]\n",
    "            - benchmark_results[func_name][\"last_count\"]\n",
    "        ) / (diff / timedelta(seconds=1))\n",
    "        log_msg = f\"Throughput = {throughput:5,.0f}\"\n",
    "\n",
    "        if sliding_window_size is not None:\n",
    "            benchmark_results[func_name][\"history\"].append(throughput)\n",
    "\n",
    "            log_msg += f\", Avg throughput = {mean(benchmark_results[func_name]['history']):5,.0f}\"\n",
    "        #             if len(benchmark_results[func_name][\"history\"]) > 1:\n",
    "        #                 log_msg += f\", Standard deviation of throughput is {stdev(benchmark_results[func_name]['history']):5,.0f}\"\n",
    "        log_msg = (\n",
    "            log_msg\n",
    "            + f\" - For {func_name}(interval={interval.seconds},{sliding_window_size=})\"\n",
    "        )\n",
    "        logger.info(log_msg)\n",
    "\n",
    "        benchmark_results[func_name][\"last_start\"] = datetime.utcnow()\n",
    "        benchmark_results[func_name][\"last_count\"] = benchmark_results[func_name][\n",
    "            \"count\"\n",
    "        ]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0d027a73",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] fastkafka.benchmark: Throughput = 2,401,289, Avg throughput = 2,401,289 - For dummy_func(interval=1,sliding_window_size=5)\n",
      "[INFO] fastkafka.benchmark: Throughput = 2,532,538, Avg throughput = 2,466,914 - For dummy_func(interval=1,sliding_window_size=5)\n",
      "[INFO] fastkafka.benchmark: Throughput = 2,523,505, Avg throughput = 2,485,777 - For dummy_func(interval=1,sliding_window_size=5)\n",
      "[INFO] fastkafka.benchmark: Throughput = 2,466,875, Avg throughput = 2,481,052 - For dummy_func(interval=1,sliding_window_size=5)\n",
      "[INFO] fastkafka.benchmark: Throughput = 2,479,232, Avg throughput = 2,480,688 - For dummy_func(interval=1,sliding_window_size=5)\n",
      "[INFO] fastkafka.benchmark: Throughput = 2,488,960, Avg throughput = 2,498,222 - For dummy_func(interval=1,sliding_window_size=5)\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{'dummy_func': {'count': 15000000,\n",
       "  'last_count': 14892399,\n",
       "  'start': datetime.datetime(2023, 4, 7, 10, 48, 19, 887819),\n",
       "  'last_start': datetime.datetime(2023, 4, 7, 10, 48, 25, 891228),\n",
       "  'history': deque([2532538.0, 2523505.0, 2466875.0, 2479232.0, 2488960.0],\n",
       "        maxlen=5)}}"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "interval = timedelta(seconds=1)\n",
    "sliding_window_size = 5\n",
    "func_name = \"dummy_func\"\n",
    "benchmark_results = dict()\n",
    "\n",
    "n = 15_000_000\n",
    "for i in range(n):\n",
    "    _benchmark(\n",
    "        interval=interval,\n",
    "        sliding_window_size=sliding_window_size,\n",
    "        func_name=func_name,\n",
    "        benchmark_results=benchmark_results,\n",
    "    )\n",
    "\n",
    "display(benchmark_results)\n",
    "\n",
    "assert benchmark_results[func_name][\"count\"] == n\n",
    "assert len(benchmark_results[func_name][\"history\"]) <= sliding_window_size, len(\n",
    "    benchmark_results[func_name][\"history\"]\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e4570e65",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
